VARMA (Vector Autoregressive Moving Average)#
Goals#
Understand the difference between VAR, VARMA, and VARMAX.
Build intuition for the moving-average (MA) part as shock memory / colored noise.
See the tradeoffs (parameter count, estimation cost) and identifiability pitfalls.
Implement simulation, forecasting, and shock propagation (impulse responses) in NumPy.
Visualize multivariate forecasts and shock propagation with Plotly.
Prerequisites#
Linear algebra (matrices, eigenvalues)
Basic time series: stationarity, innovations (white noise)
1) VAR vs VARMA vs VARMAX (precise definitions)#
Let (\mathbf{y}_t\in\mathbb{R}^k) be a k-variate time series.
VAR(p)#
A Vector Autoregression uses only lagged values of (\mathbf{y}):
(A_i\in\mathbb{R}^{k\times k}) couples variables across lags.
Noise is white by assumption: no serial correlation left in (\varepsilon_t).
VARMA(p,q)#
A Vector ARMA adds a moving-average part in past innovations:
The MA matrices (M_j\in\mathbb{R}^{k\times k}) mean past shocks echo forward.
Special cases:
VAR(p) is VARMA(p,0)
VMA(q) is VARMA(0,q)
VARMAX(p,q,r)#
A VARMAX further adds exogenous inputs (\mathbf{x}_t\in\mathbb{R}^m) (covered in the next folder):
(B_\ell\in\mathbb{R}^{k\times m}) maps input features into the k outputs.
Lag-operator matrix form#
Define the matrix polynomials
Then VARMA is:
2) What the MA extension means#
In a VAR, we assume (\varepsilon_t) is white noise. If reality has shock persistence (e.g., microstructure noise, aggregation effects, measurement frictions), the residuals from a plain VAR can remain autocorrelated.
A VARMA allows the innovation to be filtered before it fully “hits” (\mathbf{y}):
AR part ((A_i)): feedback / propagation through state dynamics.
MA part ((M_j)): short-memory filter on shocks (a finite impulse response on (\varepsilon)).
A stable VARMA has an infinite moving-average (Wold) representation:
The impulse response matrices (\Psi_h\in\mathbb{R}^{k\times k}) satisfy the recursion
(\Psi_0 = I)
for (h\ge 1):
So shock propagation is literally (\Psi_h): a unit shock in component (j) produces response (\Psi_h,\mathbf{e}_j) after (h) steps.
3) Complexity tradeoffs#
For (k) variables:
VAR(p) parameters (ignoring (\Sigma)):
VARMA(p,q) parameters:
Plus the innovation covariance (\Sigma) with (k(k+1)/2) free parameters.
Tradeoffs:
VAR is easy: OLS per equation (or stacked multivariate OLS).
VARMA is harder: estimation usually requires state space + Kalman filter and maximum likelihood.
VARMA can be more parsimonious than a high-order VAR (fewer parameters for similar autocovariance structure), but only if you can estimate it reliably.
4) Identifiability (why VARMA is tricky)#
In multivariate ARMA, parameters are not uniquely identified without additional restrictions.
Core issues:
AR/MA cancellation: if (A(L)) and (M(L)) share a common left factor, the same stochastic process can be represented with different ((A,M)).
Stability / stationarity: require (\det A(z) \neq 0) for (|z|\le 1) (no roots inside/on the unit circle).
Invertibility: require (\det M(z) \neq 0) for (|z|\le 1) so innovations are recoverable from observables.
Practical consequences:
Estimation often enforces canonical forms (e.g., echelon form) or uses a minimal state-space realization.
Small samples + many parameters can produce unstable or non-invertible fits.
In the code below we focus on:
simulation (where parameters are known),
impulse responses and forecasts given parameters,
and VAR fitting as a baseline.
import sys
import numpy as np
import plotly
import plotly.graph_objects as go
import os
import plotly.io as pio
from plotly.subplots import make_subplots
pio.templates.default = "plotly_white"
pio.renderers.default = os.environ.get("PLOTLY_RENDERER", "notebook")
np.set_printoptions(precision=4, suppress=True)
SEED = 42
rng = np.random.default_rng(SEED)
print("Python:", sys.version.split()[0])
print("NumPy:", np.__version__)
print("Plotly:", plotly.__version__)
Python: 3.12.9
NumPy: 1.26.2
Plotly: 6.5.2
def var_companion(A_list: list[np.ndarray]) -> np.ndarray:
"""Companion matrix for VAR(p) stability checks.
For y_t = sum_{i=1}^p A_i y_{t-i} + e_t.
Returns: (k*p, k*p) companion matrix.
"""
p = len(A_list)
if p == 0:
raise ValueError("Need at least one A matrix")
k = A_list[0].shape[0]
for A in A_list:
if A.shape != (k, k):
raise ValueError("All A_i must be (k,k)")
top = np.concatenate(A_list, axis=1)
if p == 1:
return top
eye = np.eye(k * (p - 1))
bottom = np.concatenate([eye, np.zeros((k * (p - 1), k))], axis=1)
return np.concatenate([top, bottom], axis=0)
def is_var_stable(A_list: list[np.ndarray]) -> bool:
F = var_companion(A_list)
eig = np.linalg.eigvals(F)
return np.max(np.abs(eig)) < 1.0
def simulate_varma(
n: int,
A_list: list[np.ndarray],
M_list: list[np.ndarray],
c: np.ndarray | None = None,
Sigma: np.ndarray | None = None,
burnin: int = 200,
seed: int = 0,
) -> tuple[np.ndarray, np.ndarray]:
"""Simulate a VARMA(p,q):
y_t = c + sum_i A_i y_{t-i} + e_t + sum_j M_j e_{t-j}
Returns:
y: (n, k)
e: (n, k) innovations used
"""
rng_local = np.random.default_rng(seed)
p = len(A_list)
q = len(M_list)
if p == 0 and q == 0:
raise ValueError("Need p>0 or q>0")
k = (A_list[0].shape[0] if p > 0 else M_list[0].shape[0])
for A in A_list:
if A.shape != (k, k):
raise ValueError("All A_i must be (k,k)")
for M in M_list:
if M.shape != (k, k):
raise ValueError("All M_j must be (k,k)")
if c is None:
c = np.zeros(k)
c = np.asarray(c, dtype=float)
if c.shape != (k,):
raise ValueError("c must be (k,)")
if Sigma is None:
Sigma = np.eye(k)
Sigma = np.asarray(Sigma, dtype=float)
if Sigma.shape != (k, k):
raise ValueError("Sigma must be (k,k)")
max_lag = max(p, q)
T = burnin + n + max_lag
L = np.linalg.cholesky(Sigma)
e = rng_local.standard_normal((T, k)) @ L.T
y = np.zeros((T, k), dtype=float)
for t in range(max_lag, T):
yt = c.copy()
for i, A in enumerate(A_list, start=1):
yt += A @ y[t - i]
yt += e[t]
for j, M in enumerate(M_list, start=1):
yt += M @ e[t - j]
y[t] = yt
sl = slice(burnin + max_lag, burnin + max_lag + n)
return y[sl], e[sl]
def var_ols_fit(y: np.ndarray, p: int, add_intercept: bool = True) -> dict:
"""Fit VAR(p) by multivariate OLS: Y = X B + E."""
y = np.asarray(y, dtype=float)
if y.ndim != 2:
raise ValueError("y must be (T,k)")
T, k = y.shape
if T <= p:
raise ValueError("Need T > p")
# Build design matrix X and target Y
Y = y[p:]
X_blocks = []
if add_intercept:
X_blocks.append(np.ones((T - p, 1)))
for lag in range(1, p + 1):
X_blocks.append(y[p - lag : T - lag])
X = np.concatenate(X_blocks, axis=1)
B_hat, *_ = np.linalg.lstsq(X, Y, rcond=None)
Y_hat = X @ B_hat
E = Y - Y_hat
c_hat = B_hat[0] if add_intercept else np.zeros(k)
A_hat = []
offset = 1 if add_intercept else 0
for i in range(p):
A_hat.append(B_hat[offset + i * k : offset + (i + 1) * k].T)
return {
'c': c_hat,
'A': A_hat,
'resid': E,
'Sigma': (E.T @ E) / (T - p),
'X': X,
'Y': Y,
}
def varma_irf(A_list: list[np.ndarray], M_list: list[np.ndarray], steps: int) -> np.ndarray:
"""Impulse response matrices Psi_0..Psi_steps for VARMA(p,q)."""
p = len(A_list)
q = len(M_list)
k = (A_list[0].shape[0] if p > 0 else M_list[0].shape[0])
Psi = np.zeros((steps + 1, k, k))
Psi[0] = np.eye(k)
for h in range(1, steps + 1):
Mh = M_list[h - 1] if (1 <= h <= q) else np.zeros((k, k))
acc = Mh.copy()
for i in range(1, min(p, h) + 1):
acc += A_list[i - 1] @ Psi[h - i]
Psi[h] = acc
return Psi
def varma_forecast_paths(
y_hist: np.ndarray,
e_hist: np.ndarray,
A_list: list[np.ndarray],
M_list: list[np.ndarray],
c: np.ndarray,
Sigma: np.ndarray,
steps: int,
n_paths: int = 500,
seed: int = 0,
) -> np.ndarray:
"""Monte Carlo forecast paths for VARMA(p,q), conditional on observed history.
Notes:
- We treat past innovations e_hist as known (in real estimation they'd be filtered).
- Future innovations are sampled from N(0,Sigma).
Returns: paths (n_paths, steps, k)
"""
rng_local = np.random.default_rng(seed)
y_hist = np.asarray(y_hist, dtype=float)
e_hist = np.asarray(e_hist, dtype=float)
T, k = y_hist.shape
if e_hist.shape != (T, k):
raise ValueError("e_hist must match y_hist shape")
p = len(A_list)
q = len(M_list)
max_lag = max(p, q)
if T < max_lag:
raise ValueError("Need enough history for max(p,q)")
L = np.linalg.cholesky(Sigma)
paths = np.zeros((n_paths, steps, k), dtype=float)
for s in range(n_paths):
y_ext = np.zeros((T + steps, k), dtype=float)
e_ext = np.zeros((T + steps, k), dtype=float)
y_ext[:T] = y_hist
e_ext[:T] = e_hist
e_ext[T:] = rng_local.standard_normal((steps, k)) @ L.T
for t in range(T, T + steps):
yt = c.copy()
for i, A in enumerate(A_list, start=1):
yt += A @ y_ext[t - i]
yt += e_ext[t]
for j, M in enumerate(M_list, start=1):
yt += M @ e_ext[t - j]
y_ext[t] = yt
paths[s] = y_ext[T:]
return paths
5) Synthetic example: simulate a VARMA(2,1)#
We will:
simulate a 2D VARMA process,
forecast multiple steps ahead via Monte Carlo,
compute impulse responses (\Psi_h) (shock propagation).
Because estimation of VARMA is itself a topic (state space + MLE), we treat the parameters as known in this notebook.
k = 2
A1 = np.array([[0.55, 0.15],
[-0.10, 0.35]])
A2 = np.array([[-0.25, 0.05],
[0.02, -0.15]])
A = [A1, A2]
# MA(1): past shocks echo into the next step
M1 = np.array([[0.60, 0.00],
[0.20, 0.30]])
M = [M1]
c = np.array([0.05, -0.02])
Sigma = np.array([[1.0, 0.4],
[0.4, 0.8]])
print("VAR stability (AR part):", is_var_stable(A))
T = 800
y, e = simulate_varma(n=T, A_list=A, M_list=M, c=c, Sigma=Sigma, burnin=300, seed=SEED)
print(y.shape, e.shape)
t = np.arange(T)
fig = make_subplots(rows=2, cols=1, shared_xaxes=True, vertical_spacing=0.05,
subplot_titles=["y[0]", "y[1]"])
for i in range(2):
fig.add_trace(go.Scatter(x=t, y=y[:, i], mode="lines", name=f"y{i}"), row=i+1, col=1)
fig.update_layout(height=450, title="Simulated VARMA(2,1) series")
fig.show()
VAR stability (AR part): True
(800, 2) (800, 2)
6) Baseline: fitting a VAR(p) by OLS#
A common practical baseline is to fit a VAR and check residual autocorrelation. If residuals show serial correlation, a VARMA (or a higher-order VAR) may be warranted.
Here we fit VAR(2) by stacked OLS.
fit = var_ols_fit(y, p=2, add_intercept=True)
print("c_hat:", fit["c"])
print("A1_hat:")
print(fit["A"][0])
print("A2_hat:")
print(fit["A"][1])
print("Residual covariance (Sigma_hat):")
print(fit["Sigma"])
c_hat: [-0.0532 -0.0974]
A1_hat:
[[0.9693 0.1923]
[0.0153 0.6448]]
A2_hat:
[[-0.4924 -0.062 ]
[-0.0421 -0.3327]]
Residual covariance (Sigma_hat):
[[1.0675 0.4556]
[0.4556 0.8706]]
7) Multivariate forecasts (Monte Carlo fan charts)#
We forecast (H) steps ahead from a cut point (T_0). Because we simulated the process, we know the realized past innovations (\varepsilon_t) up to (T_0), so we can condition on them.
For each future path, we sample future innovations and roll the model forward.
T0 = 650
H = 60
y_hist = y[:T0]
e_hist = e[:T0]
y_true = y[T0:T0+H]
paths = varma_forecast_paths(
y_hist=y_hist,
e_hist=e_hist,
A_list=A,
M_list=M,
c=c,
Sigma=Sigma,
steps=H,
n_paths=800,
seed=SEED + 123,
)
q_lo, q_med, q_hi = np.quantile(paths, [0.05, 0.5, 0.95], axis=0)
x_past = np.arange(T0)
x_fut = np.arange(T0, T0 + H)
fig = make_subplots(rows=2, cols=1, shared_xaxes=True, vertical_spacing=0.05,
subplot_titles=["Forecast y[0]", "Forecast y[1]"])
for i in range(2):
# history
fig.add_trace(go.Scatter(x=x_past, y=y_hist[:, i], mode="lines", name=f"y{i} history",
line=dict(color="rgba(0,0,0,0.55)")), row=i+1, col=1)
# truth
fig.add_trace(go.Scatter(x=x_fut, y=y_true[:, i], mode="lines", name=f"y{i} truth",
line=dict(color="rgba(0,0,0,1.0)", width=2)), row=i+1, col=1)
# interval
fig.add_trace(go.Scatter(x=x_fut, y=q_hi[:, i], mode="lines", line=dict(width=0),
showlegend=False, hoverinfo="skip"), row=i+1, col=1)
fig.add_trace(go.Scatter(x=x_fut, y=q_lo[:, i], mode="lines", line=dict(width=0),
fill="tonexty", fillcolor="rgba(0,100,200,0.18)",
name=f"90% PI y{i}", hoverinfo="skip"), row=i+1, col=1)
# median
fig.add_trace(go.Scatter(x=x_fut, y=q_med[:, i], mode="lines", name=f"median y{i}",
line=dict(color="rgba(0,100,200,1.0)", dash="dash")), row=i+1, col=1)
fig.add_vline(x=T0, line_width=1, line_dash="dot", line_color="gray")
fig.update_layout(height=520, title="VARMA multivariate forecasts (median + 90% interval)")
fig.show()
8) Shock propagation (impulse responses)#
Compute (\Psi_h) for (h=0,\dots,H). Each (\Psi_h) is a (k\times k) matrix:
columns: which variable is shocked
rows: which variable responds
We plot the responses to a 1-s.d. shock in each component.
H_irf = 25
Psi = varma_irf(A_list=A, M_list=M, steps=H_irf)
# scale shocks by marginal std
shock_std = np.sqrt(np.diag(Sigma))
fig = make_subplots(
rows=2,
cols=2,
shared_xaxes=True,
shared_yaxes=False,
horizontal_spacing=0.12,
vertical_spacing=0.10,
subplot_titles=[
"response y0 to shock e0",
"response y0 to shock e1",
"response y1 to shock e0",
"response y1 to shock e1",
],
)
h = np.arange(H_irf + 1)
for shock_j in range(2):
u = np.zeros(2)
u[shock_j] = shock_std[shock_j]
resp = Psi @ u # (H+1, k)
# response variable 0
fig.add_trace(go.Scatter(x=h, y=resp[:, 0], mode="lines", name=f"shock {shock_j}"), row=1, col=shock_j+1)
# response variable 1
fig.add_trace(go.Scatter(x=h, y=resp[:, 1], mode="lines", showlegend=False), row=2, col=shock_j+1)
fig.update_layout(height=520, title="VARMA impulse responses (1-s.d. innovations)")
fig.update_xaxes(title_text="horizon")
fig.show()
9) Exercises#
Set (M_1=0) (so VARMA becomes VAR) and compare impulse responses.
Increase (q) and observe how the MA part changes early-horizon dynamics.
Fit higher-order VAR models to VARMA data and compare forecast accuracy vs parameter count.
References#
Lütkepohl, New Introduction to Multiple Time Series Analysis
Hamilton, Time Series Analysis
Tsay, Multivariate Time Series Analysis